{
  "cells": [
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "view-in-colab"
      },
      "source": [
        "<a href=\"https://colab.research.google.com/github/pathwaycom/pathway/blob/main/examples/projects/from_jupyter_to_deploy/part3_kafka_data_streamer.ipynb\" target=\"_parent\"><img src=\"https://pathway.com/assets/colab-badge.svg\" alt=\"Run In Colab\" class=\"inline\"/></a>"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "notebook-instructions"
      },
      "source": [
        "# Installing Pathway with Python 3.10+\n",
        "\n",
        "In the cell below, we install Pathway into a Python 3.10+ Linux runtime.\n",
        "\n",
        "> **If you are running in Google Colab, please run the colab notebook (Ctrl+F9)**, disregarding the 'not authored by Google' warning.\n",
        "> \n",
        "> **The installation and loading time is less than 1 minute**.\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "cellView": "form",
        "id": "pip-installation-pathway"
      },
      "outputs": [],
      "source": [
        "%%capture --no-display\n",
        "!pip install --prefer-binary pathway"
      ]
    },
    {
      "cell_type": "markdown",
      "id": "1",
      "metadata": {},
      "source": [
        "# Part 3: Kafka integration and alerts forwarding (Producer)\n",
        "\n",
        "This notebook is a helper notebook for the third part of the tutorial [From interactive data exploration to deployment](/developers/user-guide/exploring-pathway/from-jupyter-to-deploy#part-3-kafka-integration-and-alerts-forwarding)."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "id": "2",
      "metadata": {},
      "outputs": [
        {
          "name": "stdout",
          "output_type": "stream",
          "text": [
            "--2024-06-12 08:21:04--  https://gist.githubusercontent.com/janchorowski/e351af72ecd8d206a34763a428826ab7/raw/ticker.csv\r\n",
            "Resolving gist.githubusercontent.com (gist.githubusercontent.com)... "
          ]
        },
        {
          "name": "stdout",
          "output_type": "stream",
          "text": [
            "185.199.109.133, 185.199.108.133, 185.199.111.133, ...\r\n",
            "Connecting to gist.githubusercontent.com (gist.githubusercontent.com)|185.199.109.133|:443... connected.\r\n",
            "HTTP request sent, awaiting response... 200 OK\r\n",
            "Length: 1253370 (1.2M) [text/plain]\r\n",
            "Saving to: ‘ticker.csv’\r\n",
            "\r\n",
            "\r\n",
            "ticker.csv            0%[                    ]       0  --.-KB/s               "
          ]
        },
        {
          "name": "stdout",
          "output_type": "stream",
          "text": [
            "\r\n",
            "ticker.csv          100%[===================>]   1.19M  --.-KB/s    in 0.02s   \r\n",
            "\r\n",
            "2024-06-12 08:21:04 (52.2 MB/s) - ‘ticker.csv’ saved [1253370/1253370]\r\n",
            "\r\n"
          ]
        }
      ],
      "source": [
        "# Download CSV file\n",
        "!wget -nc https://gist.githubusercontent.com/janchorowski/e351af72ecd8d206a34763a428826ab7/raw/ticker.csv"
      ]
    },
    {
      "cell_type": "markdown",
      "id": "3",
      "metadata": {},
      "source": [
        "## Writing messages to Kafka"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "id": "4",
      "metadata": {
        "lines_to_next_cell": 2
      },
      "outputs": [],
      "source": [
        "import pathway as pw\n",
        "\n",
        "# To use advanced features with Pathway Scale, get your free license key from\n",
        "# https://pathway.com/features and paste it below.\n",
        "# To use Pathway Community, comment out the line below.\n",
        "pw.set_license_key(\"demo-license-key-with-telemetry\")\n",
        "\n",
        "fname = \"ticker.csv\"\n",
        "schema = pw.schema_from_csv(fname)"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "id": "5",
      "metadata": {},
      "outputs": [
        {
          "name": "stdout",
          "output_type": "stream",
          "text": [
            "class DataSchema(pw.Schema):\n",
            "    ticker: str\n",
            "    open: float\n",
            "    high: float\n",
            "    low: float\n",
            "    close: float\n",
            "    volume: float\n",
            "    vwap: float\n",
            "    t: int\n",
            "    transactions: int\n",
            "    otc: str\n"
          ]
        }
      ],
      "source": [
        "print(schema.generate_class(class_name=\"DataSchema\"))"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "id": "6",
      "metadata": {},
      "outputs": [],
      "source": [
        "# The schema definition is autogenerated\n",
        "class DataSchema(pw.Schema):\n",
        "    ticker: str\n",
        "    open: float\n",
        "    high: float\n",
        "    low: float\n",
        "    close: float\n",
        "    volume: float\n",
        "    vwap: float\n",
        "    t: int\n",
        "    transactions: int\n",
        "    otc: str\n",
        "\n",
        "\n",
        "data = pw.demo.replay_csv(fname, schema=DataSchema, input_rate=1000)"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "id": "7",
      "metadata": {},
      "outputs": [],
      "source": [
        "# TODO: please set appropriaye values for KAFKA_ENDPOINT, KAFKA_USERNAME, and KAFKA_PASSWORD\n",
        "rdkafka_producer_settings = {\n",
        "    \"bootstrap.servers\": \"KAFKA_ENDPOINT:9092\",\n",
        "    \"security.protocol\": \"sasl_ssl\",\n",
        "    \"sasl.mechanism\": \"SCRAM-SHA-256\",\n",
        "    \"sasl.username\": \"KAFKA_USERNAME\",\n",
        "    \"sasl.password\": \"KAFKA_PASSWORD\",\n",
        "}\n",
        "\n",
        "pw.io.kafka.write(data, rdkafka_producer_settings, topic_name=\"ticker\")"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "id": "8",
      "metadata": {},
      "outputs": [],
      "source": [
        "pw.run()"
      ]
    }
  ],
  "metadata": {
    "kernelspec": {
      "display_name": "Python 3 (ipykernel)",
      "language": "python",
      "name": "python3"
    },
    "language_info": {
      "codemirror_mode": {
        "name": "ipython",
        "version": 3
      },
      "file_extension": ".py",
      "mimetype": "text/x-python",
      "name": "python",
      "nbconvert_exporter": "python",
      "pygments_lexer": "ipython3",
      "version": "3.11.9"
    }
  },
  "nbformat": 4,
  "nbformat_minor": 5
}
